package com.tt.notify.consumer;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.tt.notify.common.config.MQConfiguration;
import com.tt.notify.common.constant.MQ;
import com.tt.notify.common.constant.MQ.ConsumeType;
import com.tt.notify.common.constant.MQ.Tag;
import com.tt.notify.common.constant.MQ.Topic;
import com.tt.notify.common.exception.MQException;
import com.tt.notify.consumer.listener.MQMessageListener;
import com.tt.notify.consumer.listener.MQMessageWrapper;


/**
 * 消息消费者
 * @author  liuhaihui
 * @date    2017年7月13日 下午7:31:37
 * @version
 */
@Component
class MQConsumer {
	
	private final Logger logger = LoggerFactory.getLogger(MQConsumer.class);

	/**
	 * 注入p2p的配置
	 */
	@Autowired
	private MQConfiguration config;

	@Autowired
	private MQMessageWrapper rocketMqMessageWrapper;
	
	private ConcurrentHashMap<MQ.Topic, DefaultMQPushConsumer> defaultMQPushConsumerMap
		= new ConcurrentHashMap<MQ.Topic, DefaultMQPushConsumer>();

	@PostConstruct
	public void init() throws MQClientException {
		MQMessageListener rocketMqMessageListener = rocketMqMessageWrapper.getRocketMqMessageListener();
		if(rocketMqMessageListener.getTopic() == null){
			throw new MQException("请在RocketMqMessageListener实现类中指定topic");
		}
		Topic[] topicArr = new Topic[]{rocketMqMessageListener.getTopic()};//MQ.Producer.Topic.values();
		for (Topic topic : topicArr) {
			/**
			 * [1]配置消费组，消费所属的组
			 */
			DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(topic.getProducerAndConsumerGroupName());
			/**
			 * [2]p2p,pub-sub共用同一个消费实例
			 * 妹的，不可以设置 instanceName，这回导致P2P模式消费的是，开一个没有问题，开2-3个消费者就有无法消费的问题
			 */
			//defaultMQPushConsumer.setInstanceName(config.getInstanceName());
			defaultMQPushConsumer.setNamesrvAddr(config.getNamesrvAddr());
			
			/**
			 * [4]这里设置逐条串行消费的方式，不支持批量消费多条，因为RMQ不支持手动ack，所以不能解决消费3条消息，有1条消息消费失败后的ack问题。
			 */
			defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
			
			// 设置订阅tag下的subExpression
			//defaultMQPushConsumer.subscribe(config.getConsumerTopic(), config.getConsumerTag());
			if(rocketMqMessageWrapper == null || rocketMqMessageWrapper.getRocketMqMessageListener() == null){
				throw new MQException("如果业务系统要消费消息，请务必在业务系统实现RocketMqMessageListener接口!");
			}
			final String topicName = topic.getName();
			List<Tag> tagList = rocketMqMessageListener.getTagList();
			if(tagList == null || tagList.size() == 0){
				throw new MQException("请在RocketMqMessageListener实现类中指定一个或多个tag");
			}
			String tagName = null;
			if(tagList.size() == 1){
				tagName = tagList.get(0).getName();
			}else{
				StringBuffer tagBuffer = new StringBuffer();
				int tagCount = tagList.size();
				for (int k = 0; k < tagCount; k++) {
					tagBuffer.append(tagList.get(k).getName());
					if(k < tagCount - 1){
						tagBuffer.append(" || ");
					}
				}
				tagName = tagBuffer.toString();
			}
			/**
			 * 设置本consumer要消费的topic和tag
			 */
			defaultMQPushConsumer.subscribe(topicName, tagName);
			
			
			// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
			// 如果非第一次启动，那么按照上次消费的位置继续消费
			defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			
			/**
			 * [3]设置为集群消费(区别于广播消费)
			 */
			ConsumeType mqConsumeType = rocketMqMessageListener.getMQConsumeType();
			if(mqConsumeType == MQ.ConsumeType.BOARDCAST){
				defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
			}else if(mqConsumeType == MQ.ConsumeType.POINTTOPOINT){
				defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
			}else{
				throw new MQException("消费类型错误！既不是广播模式，也不是点对点模式！");
			}
			
			// 注册监听
			defaultMQPushConsumer.registerMessageListener(rocketMqMessageWrapper);
			
			// 关闭VIP通道，避免接收不了消息
			//defaultMQPushConsumer.setVipChannelEnabled(false);
			
			defaultMQPushConsumer.start();
			defaultMQPushConsumerMap.put(topic, defaultMQPushConsumer);
			//config.getConsumerGroup()
			logger.info("消费者:consumerGroup={},namesrvAddr={}", topic.getProducerAndConsumerGroupName(), config.getNamesrvAddr());
			logger.info("消费者启动成功|监听TOPIC:{},TAG:{}|RocketMQ Client start success", topicName, tagName);
		}
	}

	@PreDestroy
	public void destroy() {
		for (Map.Entry<MQ.Topic, DefaultMQPushConsumer> entrySet : defaultMQPushConsumerMap.entrySet()) {
			entrySet.getValue().shutdown();
		}
	}
	
	
	public static void main(String[] args) {
		List<MQ.Tag> tagList = Arrays.asList(new MQ.Tag[]{MQ.Tag.TAG_DOG_WANGWANG,MQ.Tag.TAG_DOG_XIAOHEI,MQ.Tag.TAG_DOG_BAIMAO});
		StringBuffer tagBuffer = new StringBuffer();
		int tagCount = tagList.size();
		for (int k = 0; k < tagCount; k++) {
			tagBuffer.append(tagList.get(k).getName());
			System.out.println("k: " + k + ", tagCount:" + tagCount);
			if(k < tagCount - 1){
				tagBuffer.append(" || ");
			}
		}
		String tag = tagBuffer.toString();
		System.out.println(tag);
	}

}
